home *** CD-ROM | disk | FTP | other *** search
/ Freelog 125 / Freelog_MarsAvril2015_No125.iso / Internet / gpodder / gpodder-3.8.3-setup.exe / {app} / src / mygpoclient / http_test.py < prev    next >
Text File  |  2013-02-08  |  7KB  |  196 lines

  1. # -*- coding: utf-8 -*-
  2. # gpodder.net API Client
  3. # Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program.  If not, see <http://www.gnu.org/licenses/>.
  17.  
  18. from mygpoclient import http
  19.  
  20. import unittest
  21. import multiprocessing
  22. import BaseHTTPServer
  23.  
  24. def http_server(port, username, password, response):
  25.     storage = {}
  26.     class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
  27.         def __init__(self, *args, **kwargs):
  28.             BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
  29.  
  30.         def _checks(self):
  31.             if not self._check_auth():
  32.                 return False
  33.             elif not self._check_errors():
  34.                 return False
  35.             else:
  36.                 return True
  37.  
  38.         def _check_auth(self):
  39.             if self.path.startswith('/auth'):
  40.                 authorization = self.headers.get('authorization', None)
  41.                 if authorization is not None:
  42.                     auth_type, credentials = authorization.split(None, 1)
  43.                     if auth_type.lower() == 'basic':
  44.                         auth_user, auth_pass = credentials.decode('base64').split(':', 1)
  45.                         if username == auth_user and password == auth_pass:
  46.                             return True
  47.  
  48.                 self.send_response(401)
  49.                 self.send_header('WWW-Authenticate', 'Basic realm="Fake HTTP Server"')
  50.                 self.end_headers()
  51.                 self.wfile.close()
  52.                 return False
  53.  
  54.             return True
  55.  
  56.         def _check_errors(self):
  57.             if self.path.startswith('/badrequest'):
  58.                 self.send_response(400)
  59.                 self.end_headers()
  60.                 self.wfile.close()
  61.                 return False
  62.             elif self.path.startswith('/notfound'):
  63.                 self.send_response(404)
  64.                 self.end_headers()
  65.                 self.wfile.close()
  66.                 return False
  67.             elif self.path.startswith('/invaliderror'):
  68.                 self.send_response(444)
  69.                 self.end_headers()
  70.                 self.wfile.close()
  71.                 return False
  72.  
  73.             return True
  74.  
  75.         def do_POST(self):
  76.             if not self._checks():
  77.                 return
  78.  
  79.             input_data = self.rfile.read(int(self.headers.get('content-length')))
  80.             self.send_response(200)
  81.             self.end_headers()
  82.             self.wfile.write(input_data.encode('rot13'))
  83.             self.wfile.close()
  84.  
  85.         def do_PUT(self):
  86.             if not self._checks():
  87.                 return
  88.  
  89.             input_data = self.rfile.read(int(self.headers.get('content-length')))
  90.             storage[self.path] = input_data
  91.             self.send_response(200)
  92.             self.end_headers()
  93.             self.wfile.write('PUT OK')
  94.             self.wfile.close()
  95.  
  96.         def do_GET(self):
  97.             if not self._checks():
  98.                 return
  99.  
  100.             self.send_response(200)
  101.             self.end_headers()
  102.             if self.path in storage:
  103.                 self.wfile.write(storage[self.path])
  104.             else:
  105.                 self.wfile.write(response)
  106.             self.wfile.close()
  107.  
  108.         def log_request(*args):
  109.             pass
  110.  
  111.     BaseHTTPServer.HTTPServer(('127.0.0.1', port), Handler).serve_forever()
  112.  
  113. class Test_HttpClient(unittest.TestCase):
  114.     USERNAME = 'john'
  115.     PASSWORD = 'secret'
  116.     PORT = 9876
  117.     URI_BASE = 'http://localhost:%(PORT)d' % locals()
  118.     RESPONSE = 'Test_GET-HTTP-Response-Content'
  119.     DUMMYDATA = 'fq28cnyp3ya8ltcy;ny2t8ay;iweuycowtc'
  120.  
  121.     def setUp(self):
  122.         self.server_process = multiprocessing.Process(target=http_server,
  123.                 args=(self.PORT, self.USERNAME, self.PASSWORD, self.RESPONSE))
  124.         self.server_process.start()
  125.         import time
  126.         time.sleep(.1)
  127.  
  128.     def tearDown(self):
  129.         self.server_process.terminate()
  130.         import time
  131.         time.sleep(.1)
  132.  
  133.     def test_UnknownResponse(self):
  134.         client = http.HttpClient()
  135.         path = self.URI_BASE+'/invaliderror'
  136.         self.assertRaises(http.UnknownResponse, client.GET, path)
  137.  
  138.     def test_NotFound(self):
  139.         client = http.HttpClient()
  140.         path = self.URI_BASE+'/notfound'
  141.         self.assertRaises(http.NotFound, client.GET, path)
  142.  
  143.     def test_Unauthorized(self):
  144.         client = http.HttpClient('invalid-username', 'invalid-password')
  145.         path = self.URI_BASE+'/auth'
  146.         self.assertRaises(http.Unauthorized, client.GET, path)
  147.  
  148.     def test_BadRequest(self):
  149.         client = http.HttpClient()
  150.         path = self.URI_BASE+'/badrequest'
  151.         self.assertRaises(http.BadRequest, client.GET, path)
  152.  
  153.     def test_GET(self):
  154.         client = http.HttpClient()
  155.         path = self.URI_BASE+'/noauth'
  156.         self.assertEquals(client.GET(path), self.RESPONSE)
  157.  
  158.     def test_authenticated_GET(self):
  159.         client = http.HttpClient(self.USERNAME, self.PASSWORD)
  160.         path = self.URI_BASE+'/auth'
  161.         self.assertEquals(client.GET(path), self.RESPONSE)
  162.  
  163.     def test_unauthenticated_GET(self):
  164.         client = http.HttpClient()
  165.         path = self.URI_BASE+'/auth'
  166.         self.assertRaises(http.Unauthorized, client.GET, path)
  167.  
  168.     def test_POST(self):
  169.         client = http.HttpClient()
  170.         path = self.URI_BASE+'/noauth'
  171.         self.assertEquals(client.POST(path, self.DUMMYDATA), self.DUMMYDATA.encode('rot13'))
  172.  
  173.     def test_authenticated_POST(self):
  174.         client = http.HttpClient(self.USERNAME, self.PASSWORD)
  175.         path = self.URI_BASE+'/auth'
  176.         self.assertEquals(client.POST(path, self.DUMMYDATA), self.DUMMYDATA.encode('rot13'))
  177.  
  178.     def test_unauthenticated_POST(self):
  179.         client = http.HttpClient()
  180.         path = self.URI_BASE+'/auth'
  181.         self.assertRaises(http.Unauthorized, client.POST, path, self.DUMMYDATA)
  182.  
  183.     def test_PUT(self):
  184.         client = http.HttpClient()
  185.         path = self.URI_BASE+'/noauth'
  186.         self.assertEquals(client.PUT(path, self.DUMMYDATA), 'PUT OK')
  187.  
  188.     def test_GET_after_PUT(self):
  189.         client = http.HttpClient()
  190.         for i in range(10):
  191.             path = self.URI_BASE + '/file.%(i)d.txt' % locals()
  192.             client.PUT(path, self.RESPONSE + str(i))
  193.             self.assertEquals(client.GET(path), self.RESPONSE + str(i))
  194.  
  195.  
  196.